package com.bieber.server;

import com.bieber.common.Constants;
import com.bieber.common.Node;
import com.bieber.server.config.ServerConfig;
import com.bieber.server.executor.NamedThreadFactory;
import com.bieber.server.pack.FilePackage;
import com.bieber.server.pack.FileStatus;
import com.bieber.server.remote.Client;

import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by bieber on 2015/8/19.
 */
public class Sender implements Runnable{


    private ServerConfig config;

    private List<Client> clients = new ArrayList<Client>();

    private File readRoot;

    private List<Node> nodes;

    private Lock nodeLock = new ReentrantLock();

    //表示当前channel同时可以发送多少个文件，当前只能是一个
    private Semaphore concurrentSendSemaphore = new Semaphore(1);

    private static final ExecutorService SUB_FILE_SENDER_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),new NamedThreadFactory("TRANSPORTER-SUB-FILE-SENDER"));

    private volatile  boolean isWorking=false;

    public Sender(ServerConfig config, List<Node> nodes){
        this.config = config;
        readRoot = new File(config.getSendFileDir());
        this.nodes = nodes;
        for(Node node:nodes){
            Client client=new Client(config,node);
            client.initialize();
            client.start();
            clients.add(client);
        }
    }

   public void unregisterNode(Node node){
       nodeLock.lock();
       try{
           int index = this.nodes.indexOf(node);
           if(index>=0){
               this.nodes.get(index).setAvailable(false);
           }
           this.nodes.remove(node);
       }finally {
           nodeLock.unlock();
       }
   }

    @Override
    public void run() {
        if(nodes.size()==0||!Transporter.isRunning()){
            return;
        }
        isWorking=true;
        try{
            File[] files = readRoot.listFiles();
            for(final File file:files){
                try {
                    concurrentSendSemaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException("failed to get send semaphore for file "+file.getName());
                }
                //如果当前节点关闭状态，那么将停止传输文件
                if(!Transporter.isRunning()){
                    break;
                }
                if(file.isDirectory()){
                    continue;
                }
                startSendFile(file);
                long fileSize = file.length();
                long fileChunks = fileSize/config.getSubFileSize();
                if(fileSize%config.getSubFileSize()!=0){
                    fileChunks++;
                }
                long endIndex=0;
                Constants.COMMON_LOGGER.info("had cut [{}] sub files for send to all nodes",fileChunks);
                final AtomicLong unDoneCounter = new AtomicLong(fileChunks);
                for(int i=0;i<fileChunks;i++){
                    if(i==fileChunks-1){
                        endIndex=fileSize;
                    }else if(i==0){
                        endIndex=(i+1)*config.getSubFileSize();
                    }else{
                        endIndex=(i+1)*config.getSubFileSize();
                    }
                    SubFileSender subFileReader = new SubFileSender(i * config.getSubFileSize(), endIndex, file, config, new CallBack() {
                        @Override
                        public void done() {
                            long currentIndex = unDoneCounter.decrementAndGet();
                            if(currentIndex==0){
                                finishedSendFile(file);
                            }
                        }
                    });
                    SUB_FILE_SENDER_EXECUTOR.submit(subFileReader);
                }
                if(fileChunks==0){
                    finishedSendFile(file);
                }
            }
        }finally {
            isWorking=false;
        }
    }


    interface CallBack {

        public void done();
    }

    class SubFileSender implements Runnable{

        private long startIndex;

        private long endIndex;

        private File file;

        private ServerConfig config;

        private CallBack callBack;

        public SubFileSender(long startIndex, long endIndex, File file, ServerConfig config,CallBack callBack) {
            this.startIndex = startIndex;
            this.endIndex = endIndex;
            this.file = file;
            this.config = config;
            this.callBack = callBack;
            Constants.COMMON_LOGGER.info("create sub file reader [{}]",this);
        }

        @Override
        public void run() {
            sendFileContent();
        }

        private void sendFileContent(){
            RandomAccessFile accessFile=null;
            try {
                accessFile = new RandomAccessFile(file,"r");
                accessFile.seek(startIndex);
                byte[] buffer = new byte[config.getChunkSize()];
                int offset=0;
                long index=startIndex;
                FilePackage filePackage ;
                int chunkSize = config.getChunkSize();
                if(endIndex-index<chunkSize){
                    chunkSize = (int) (endIndex-index);
                }
                while((offset=accessFile.read(buffer,0,chunkSize))>0){
                    filePackage = new FilePackage();
                    filePackage.setFileSize(file.length());
                    filePackage.setFileName(file.getName());
                    filePackage.setStatus(FileStatus.WRITE_CONTENT);
                    filePackage.setNodeName(config.getNodeName());
                    filePackage.setOffset(index);
                    ByteBuffer byteBuffer = ByteBuffer.allocate(offset).put(buffer,0,offset);
                    byteBuffer.flip();
                    filePackage.setContent(byteBuffer);
                    sendToAllNodes(filePackage);
                    index+=offset;
                    if(index==endIndex){
                        break;
                    }
                    if(endIndex-index<chunkSize){
                        chunkSize = (int) (endIndex-index);
                    }
                }
                callBack.done();
            } catch (Exception e) {
                throw new RuntimeException("failed to send sub file content "+this,e);
            }finally {
                if(accessFile!=null){
                    try {
                        accessFile.close();
                    } catch (IOException e) {
                        Constants.COMMON_LOGGER.warn("Failed to read sub file for file "+file.getName(),e);
                    }
                }
            }
        }

        @Override
        public String toString() {
            return "SubFileReader{" +
                    "startIndex=" + startIndex +
                    ", endIndex=" + endIndex +
                    ", file=" + file +
                    '}';
        }
    }

    private void startSendFile(File file){
        Constants.COMMON_LOGGER.info("start send file [{}]",file.getName());
        FilePackage filePackage = new FilePackage();
        filePackage.setStatus(FileStatus.START_WRITE);
        filePackage.setNodeName(config.getNodeName());
        filePackage.setFileSize(file.length());
        filePackage.setFileName(file.getName());
        filePackage.setOffset(0);
        sendToAllNodes(filePackage);
    }
    private void finishedSendFile(File file){
        Constants.COMMON_LOGGER.info("finished send file [{}]",file.getName());
        FilePackage filePackage = new FilePackage();
        filePackage.setStatus(FileStatus.END_WRITE);
        filePackage.setNodeName(config.getNodeName());
        filePackage.setFileSize(file.length());
        filePackage.setFileName(file.getName());
        filePackage.setOffset(file.length());
        sendToAllNodes(filePackage);
        concurrentSendSemaphore.release();
    }


    public boolean isWorking() {
        return isWorking;
    }

    public boolean containNode(Node node){
        return this.nodes.contains(node);
    }


    private void sendToAllNodes(FilePackage filePackage){
        nodeLock.lock();
        try{
            for(Client client:clients){
                if(client.isAvailable()){
                    client.write(filePackage);
                }
            }
        }finally {
            nodeLock.unlock();
        }
    }
}
